RabbitMQ — 为什么错误的订阅者会收到已发布的消息?

RabbitMQ — Why does the wrong subscriber get a published message?

我有两个服务,ManagerCollector

  1. Manager 使用 routingKey user.collected 订阅了队列 COLLECTED_USER 并调用了 UserCollected 处理程序。
  2. Collector 使用 routingKey user.collect 订阅了 Queue COLLECT_USER 并调用了 CollectUser 处理程序。

可以有多个收集器,所以我将 exclusive 设置为 false(代码见下文)。

还有其他服务可以侦听

等事件

此外还有监听更多一般事件的服务,例如

等等。

所以我正在使用 topic 交换。

设置

| exchange | type  | routingKey     | queueName      |
| -------- | ----- | -------------- | -------------  |
| MY_APP   | topic | user.collect   | COLLECT_USER   |
| MY_APP   | topic | user.collected | COLLECTED_USER |

应该发生什么:

  1. Manager 使用 routingKey user.collect
  2. 发布消息
  3. Collector 获取 user.collect 消息并调用 CollectUser 处理程序
  4. Collector 的 CollectUser 处理程序确实有效,然后使用 routingKey user.collected
  5. 发布消息
  6. Manager 获取 user.collected 消息并调用 UserCollected 处理程序

实际发生了什么:

  1. Manager 使用 routingKey user.collect 发布消息(正确)
  2. Collector 获取 user.collect 消息并调用 CollectUser 处理程序(正确)
  3. Manager 也收到 user.collect 消息并使用错误数据调用 UserCollected 处理程序。 (错误)
  4. Collector 的 CollectUser 处理程序确实有效,然后使用 routingKey user.collected(正确)
  5. 发布消息
  6. Manager 获取 user.collected 消息并调用 UserCollected 处理程序(正确)

我的问题

为什么 Manager 收到 user.collect 消息,给定:

  1. 它正在侦听 COLLECTED_USER 队列而不是 COLLECT_USER 队列,并且
  2. 正在侦听 COLLECT_USER 队列的 收集器 已经处理了消息。

实施细节

我按如下方式创建订阅者和发布者(针对相关性进行了修剪)

创建订阅者

给定 AMQP url 和参数 urlexchangetyperoutingKeyqueueNamehandler

const connection = await amqp.connect(url)
const channel = await connection.createChannel()
channel.assertExchange(exchange, type, { durable: true })
const result = await channel.assertQueue(queueName, { exclusive: false })
channel.bindQueue(result.queue, exchange, routingKey)
channel.prefetch(1)
channel.consume(result.queue, handler)

创建发布者

给定 AMQP url 和参数 urlexchangetype

const connection = await amqp.connect(url)
const channel = await connection.createChannel()
await channel.assertExchange(exchange, type, { durable: true })

发布

给定 channel 和参数 exchangeroutingKeymessage

await channel.publish(exchange, routingKey, message)

备注

这个问题是 RabbitMQ 的后续问题——为什么我的路由键在使用主题交换时被忽略 .

我终于弄明白我的问题是什么了。肮脏的交换。在对此进行试验时,我无意中添加了一个将消息路由到错误队列的交换器,这让我感到困惑。

为了修复它,我启动了 RabbitMQ 管理 GUI 并删除了所有队列,让我的代码创建它需要的队列。上面概述的代码没有问题。